/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.persistence;

import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ReplyCache;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.txn.TxnHeader;

import java.io.File;
import java.io.IOException;
import java.util.List;

/**
 * This is a helper class above the implementations of txnlog and snapshot
 * classes
 */
public class FileTxnSnapLog {
	// the direcotry containing the
	// the transaction logs
	File dataDir;
	// the directory containing the
	// the snapshot directory
	File snapDir;
	TxnLog txnLog;
	SnapShot snapLog;
	public final static int VERSION = 2;
	public final static String version = "version-";

	private static final Logger LOG = Logger.getLogger(FileTxnSnapLog.class);

	/**
	 * This listener helps the external apis calling restore to gather
	 * information while the data is being restored.
	 */
	public interface PlayBackListener {
		void onTxnLoaded(TxnHeader[] hdr, Record[] rec);
	}

	/**
	 * the constructor which takes the datadir and snapdir.
	 * 
	 * @param dataDir
	 *            the trasaction directory
	 * @param snapDir
	 *            the snapshot directory
	 */
	public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
		this.dataDir = new File(dataDir, version + VERSION);
		this.snapDir = new File(snapDir, version + VERSION);
		if (!this.dataDir.exists()) {
			if (!this.dataDir.mkdirs()) {
				throw new IOException("Unable to create data directory "
						+ this.dataDir);
			}
		}
		if (!this.snapDir.exists()) {
			if (!this.snapDir.mkdirs()) {
				throw new IOException("Unable to create snap directory "
						+ this.snapDir);
			}
		}
		txnLog = new FileTxnLog(this.dataDir);
		snapLog = new FileSnap(this.snapDir);
	}

	/**
	 * get the datadir used by this filetxn snap log
	 * 
	 * @return the data dir
	 */
	public File getDataDir() {
		return this.dataDir;
	}

	/**
	 * get the snap dir used by this filetxn snap log
	 * 
	 * @return the snap dir
	 */
	public File getSnapDir() {
		return this.snapDir;
	}

	/**
	 * this function restores the server database after reading from the
	 * snapshots and transaction logs
	 * 
	 * @param dt
	 *            the datatree to be restored
	 * @param listener
	 *            the playback listener to run on the database restoration
	 * @return the highest zxid restored
	 * @throws IOException
	 */
	public long restore(DataTree dt, PlayBackListener listener)
			throws IOException {
		snapLog.deserialize(dt);
		FileTxnLog txnLog = new FileTxnLog(dataDir);
		TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
		long highestZxid = dt.lastProcessedZxid;
		TxnHeader[] hdr;
		while (true) {
			// iterator points to
			// the first valid txn when initialized
			hdr = itr.getHeader();
			if (hdr == null) {
				// empty logs
				return dt.lastProcessedZxid;
			}
			if (hdr[hdr.length - 1].getZxid() < highestZxid && highestZxid != 0) {
				LOG.debug(highestZxid + "(higestZxid) > "
						+ hdr[hdr.length - 1].getZxid()
						+ "(next log) for type "
						+ hdr[hdr.length - 1].getType());
			} else {
				highestZxid = hdr[hdr.length - 1].getZxid();
			}
			processTransaction(hdr, dt, itr.getTxn());
			listener.onTxnLoaded(hdr, itr.getTxn());
			if (!itr.next())
				break;
		}
		ReplyCache.rebuild();
		return highestZxid;
	}

	/**
	 * process the transaction on the datatree
	 * 
	 * @param hdr
	 *            the hdr of the transaction
	 * @param dt
	 *            the datatree to apply transaction to
	 * @param txn
	 *            the transaction to be applied
	 */
	private void processTransaction(TxnHeader[] hdr, DataTree dt, Record[] txn) {

		DataTree.ProcessTxnResult[] rc = dt.processTxn(hdr, txn);
		if (hdr[0].getType() != ZooDefs.OpCode.prepareData) {
			for (int i = 0; i < hdr.length; i++) {
				ReplyHeader header = new ReplyHeader(hdr[i].getCxid(), hdr[i]
						.getNonce(), hdr[i].getType(), hdr[i].getZxid(),
						rc[0].err);
				ReplyCache.addReply(hdr[i].getClientId(), hdr[i].getCxid(),
						header, null, "response");
			}
		}

	}

	/**
	 * the last logged zxid on the transaction logs
	 * 
	 * @return the last logged zxid
	 */
	public long getLastLoggedZxid() {
		FileTxnLog txnLog = new FileTxnLog(dataDir);
		return txnLog.getLastLoggedZxid();
	}

	/**
	 * save the datatree and the sessions into a snapshot
	 * 
	 * @param dataTree
	 *            the datatree to be serialized onto disk serialized onto disk
	 * @throws IOException
	 */
	public void save(DataTree dataTree) throws IOException {
		long lastZxid = dataTree.lastProcessedZxid;
		LOG.info("Snapshotting: " + Long.toHexString(lastZxid));
		File snapshot = new File(snapDir, Util.makeSnapshotName(lastZxid));
		snapLog.serialize(dataTree, snapshot);
		LOG.info("End Snapshotting: " + Long.toHexString(lastZxid));

	}

	/**
	 * truncate the transaction logs the zxid specified
	 * 
	 * @param zxid
	 *            the zxid to truncate the logs to
	 * @return true if able to truncate the log, false if not
	 * @throws IOException
	 */
	public boolean truncateLog(long zxid) throws IOException {
		FileTxnLog txnLog = new FileTxnLog(dataDir);
		return txnLog.truncate(zxid);
	}

	/**
	 * the most recent snapshot in the snapshot directory
	 * 
	 * @return the file that contains the most recent snapshot
	 * @throws IOException
	 */
	public File findMostRecentSnapshot() throws IOException {
		FileSnap snaplog = new FileSnap(snapDir);
		return snaplog.findMostRecentSnapshot();
	}

	/**
	 * the n most recent snapshots
	 * 
	 * @param n
	 *            the number of recent snapshots
	 * @return the list of n most recent snapshots, with the most recent in
	 *         front
	 * @throws IOException
	 */
	public List<File> findNRecentSnapshots(int n) throws IOException {
		FileSnap snaplog = new FileSnap(snapDir);
		return snaplog.findNRecentSnapshots(n);
	}

	/**
	 * get the snapshot logs that are greater than the given zxid
	 * 
	 * @param zxid
	 *            the zxid that contains logs greater than zxid
	 * @return
	 */
	public File[] getSnapshotLogs(long zxid) {
		return FileTxnLog.getLogFiles(dataDir.listFiles(), zxid);
	}

	/**
	 * append the request to the transaction logs
	 * 
	 * @param si
	 *            the request to be appended returns true iff something
	 *            appended, otw false
	 * @throws IOException
	 */
	public boolean append(Request[] si) throws IOException {
		TxnHeader[] hdr = new TxnHeader[si.length];
		Record[] record = new Record[si.length];
		for (int i = 0; i < si.length; i++) {
			hdr[i] = si[i].hdr;
			record[i] = si[i].txn;
		}
		return txnLog.append(hdr, record);
	}

	/**
	 * commit the transaction of logs
	 * 
	 * @throws IOException
	 */
	public void commit() throws IOException {
		txnLog.commit();
	}

	/**
	 * roll the transaction logs
	 * 
	 * @throws IOException
	 */
	public void rollLog() throws IOException {
		txnLog.rollLog();
	}

	/**
	 * close the transaction log files
	 * 
	 * @throws IOException
	 */
	public void close() throws IOException {
		txnLog.close();
		snapLog.close();
	}
}
